package com.xiaohu.sink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;

/*
Flink的DataStream API专门提供了向外部写入数据的方法: addSink。
与addSource类似,addSink方法对应着一个"Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;
Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前, Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction (...));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。
在这个接口中只需要+重写一个方法invoke(,用来将指定的值写入到外部系统中。
这个方法在每条数据记录到来时都会调用。
Flink1.12开始，同样重构了 Sink架构，stream.sinkTo (...)


将数据流写入到文件中，导入依赖：flink-connector-files
 */
public class FileSinkDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //设置全局并行度1,如果并行度大于1，就会每次有2个文件写入
        // 多个并行度均分范围去生成，例如最大值100，并行度3，三个并行度：0开始  33开始  66开始

        //使用写文件必须开启checkpoint, 设置每多久时间关闭该关闭的文件，否则文件不可读
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); //每2秒保存一次，精确一次


        DataGeneratorSource<String> generatorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number: " + value;
                    }
                },                                                                  // 数据生成器的参数 输入类型固定Long,不能改
                Long.MAX_VALUE,                                                                // 数据生成范围，也可以理解次数0-9
                // 无界流的效果Long.MAX_VALUE
                RateLimiterStrategy.perSecond(100),                   //限速每秒钟生成1条数据
                Types.STRING                                                        // 设置返回类型
        );

        DataStreamSource<String> source = env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // TODO： 输出到文件系统
        // public static <IN> DefaultRowFormatBuilder<IN> forRowFormat( final Path basePath, final Encoder<IN> encoder)
        //TODO:注意：需要在forRowFormat前面指定数据类型！！！！
        FileSink<String> fileSink = FileSink.<String>forRowFormat(new Path("output/file_sink/demo1"), new SimpleStringEncoder<>("UTF-8"))
                .withOutputFileConfig(new OutputFileConfig("xiaohu", "-demo.txt")) // 设置输出文件的一些配置，例如前缀或后缀
//                        .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd-HH", ZoneId.of("CTT"))) // 文件分桶，类似于hive分桶，这是分桶规律，可以根据路径分桶，也可以根据时间分桶
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd-HH", ZoneId.systemDefault())) //默认时区
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
//                                .withRolloverInterval(Duration.ofSeconds(10))
                                .withMaxPartSize(MemorySize.parse("10KB"))
                                .build()
                ) //文件滚动策略，可以根据时间或大小生成文件,滚动时间是10s或1M
                .build();

        source.sinkTo(fileSink);


        env.execute();
    }
}
